{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"images/dask_horizontal.svg\" align=\"right\" width=\"30%\">"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Lazy execution"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here we discuss some of the concepts behind dask, and lazy execution of code. You do not need to go through this material if you are eager to get on with the tutorial, but it may help understand the concepts underlying dask, how these things fit in with techniques you might already be using, and how to understand things that can go wrong."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Prelude"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As Python programmers, you probably already perform certain *tricks* to enable computation of larger-than-memory datasets, parallel execution or delayed/background execution. Perhaps with this phrasing, it is not clear what we mean, but a few examples should make things clearer. The point of Dask is to make simple things easy and complex things possible!\n",
    "\n",
    "Aside from the [detailed introduction](http://dask.pydata.org/en/latest/), we can summarize the basics of Dask as follows:\n",
    "\n",
    "- process data that doesn't fit into memory by breaking it into blocks and specifying task chains\n",
    "- parallelize execution of tasks across cores and even nodes of a cluster\n",
    "- move computation to the data rather than the other way around, to minimize communication overhead\n",
    "\n",
    "All of this allows you to get the most out of your computation resources, but program in a way that is very familiar: for-loops to build basic tasks, Python iterators, and the NumPy (array) and Pandas (dataframe) functions for multi-dimensional or tabular data, respectively.\n",
    "\n",
    "The remainder of this notebook will take you through the first of these programming paradigms. This is more detail than some users will want, who can skip ahead to the iterator, array and dataframe sections; but there will be some data processing tasks that don't easily fit into those abstractions and need to fall back to the methods here.\n",
    "\n",
    "We include a few examples at the end of the notebooks showing that the ideas behind how Dask is built are not actually that novel, and experienced programmers will have met parts of the design in other situations before. Those examples are left for the interested."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Dask is a graph execution engine"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Dask allows you to construct a prescription for the calculation you want to carry out. That may sound strange, but a simple example will demonstrate that you can achieve this while programming with perfectly ordinary Python functions and for-loops. We saw this in the previous notebook."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask import delayed\n",
    "\n",
    "@delayed\n",
    "def inc(x):\n",
    "    return x + 1\n",
    "\n",
    "@delayed\n",
    "def add(x, y):\n",
    "    return x + y"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here we have used the delayed annotation to show that we want these functions to operate lazily — to save the set of inputs and execute only on demand. `dask.delayed` is also a function which can do this, without the annotation, leaving the original function unchanged, e.g.,\n",
    "```python\n",
    "    delayed_inc = delayed(inc)\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# this looks like ordinary code\n",
    "x = inc(15)\n",
    "y = inc(30)\n",
    "total = add(x, y)\n",
    "# x, y and total are all delayed objects. \n",
    "# They contain a prescription of how to carry out the computation"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Calling a delayed function created a delayed object (`x, y, total`) which can be examined interactively. Making these objects is somewhat equivalent to constructs like the `lambda` or function wrappers (see below). Each holds a simple dictionary describing the task graph, a full specification of how to carry out the computation.\n",
    "\n",
    "We can visualize the chain of calculations that the object `total` corresponds to as follows; the circles are functions, rectangles are data/results."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "total.visualize()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "But so far, no functions have actually been executed. This demonstrated the division between the graph-creation part of Dask (`delayed()`, in this example) and the graph execution part of Dask.\n",
    "\n",
    "To run the \"graph\" in the visualization, and actually get a result, do:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# execute all tasks\n",
    "total.compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Why should you care about this?**\n",
    "\n",
    "By building a specification of the calculation we want to carry out before executing anything, we can pass the specification to an *execution engine* for evaluation. In the case of Dask, this execution engine could be running on many nodes of a cluster, so you have access to the full number of CPU cores and memory across all the machines. Dask will intelligently execute your calculation with care for minimizing the amount of data held in memory, while parallelizing over the tasks that make up a graph. Notice that in the animated diagram below, where four workers are processing the (simple) graph, execution progresses vertically up the branches first, so that intermediate results can be expunged before moving onto a new branch.\n",
    "\n",
    "With `delayed` and normal pythonic looped code, very complex graphs can be built up and passed on to Dask for execution. See a nice example of [simulated complex ETL](https://blog.dask.org/2017/01/24/dask-custom) work flow.\n",
    "\n",
    "![this](images/grid_search_schedule.gif)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We will apply `delayed` to a real data processing task, albeit a simple one.\n",
    "\n",
    "Consider reading three CSV files with `pd.read_csv` and then measuring their total length. We will consider how you would do this with ordinary Python code, then build a graph for this process using delayed, and finally execute this graph using Dask, for a handy speed-up factor of more than two (there are only three inputs to parallelize over)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%run prep.py -d accounts"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import os\n",
    "filenames = [os.path.join('data', 'accounts.%d.csv' % i) for i in [0, 1, 2]]\n",
    "filenames"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "# normal, sequential code\n",
    "a = pd.read_csv(filenames[0])\n",
    "b = pd.read_csv(filenames[1])\n",
    "c = pd.read_csv(filenames[2])\n",
    "\n",
    "na = len(a)\n",
    "nb = len(b)\n",
    "nc = len(c)\n",
    "\n",
    "total = sum([na, nb, nc])\n",
    "print(total)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Your task is to recreate this graph again using the delayed function on the original Python code. The three functions you want to delay are `pd.read_csv`, `len` and `sum`.. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```python\n",
    "delayed_read_csv = delayed(pd.read_csv)\n",
    "a = delayed_read_csv(filenames[0])\n",
    "...\n",
    "\n",
    "total = ...\n",
    "\n",
    "# execute\n",
    "%time total.compute()   \n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# your verbose code here"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, repeat this using loops, rather than writing out all the variables."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# your concise code here"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "source_hidden": true
    }
   },
   "outputs": [],
   "source": [
    "## verbose version\n",
    "delayed_read_csv = delayed(pd.read_csv)\n",
    "a = delayed_read_csv(filenames[0])\n",
    "b = delayed_read_csv(filenames[1])\n",
    "c = delayed_read_csv(filenames[2])\n",
    "\n",
    "delayed_len = delayed(len)\n",
    "na = delayed_len(a)\n",
    "nb = delayed_len(b)\n",
    "nc = delayed_len(c)\n",
    "\n",
    "delayed_sum = delayed(sum)\n",
    "\n",
    "total = delayed_sum([na, nb, nc])\n",
    "%time print(total.compute())\n",
    "\n",
    "\n",
    "## concise version\n",
    "csvs = [delayed(pd.read_csv)(fn) for fn in filenames]\n",
    "lens = [delayed(len)(csv) for csv in csvs]\n",
    "total = delayed(sum)(lens)\n",
    "%time print(total.compute())\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Notes**\n",
    "\n",
    "Delayed objects support various operations:\n",
    "```python\n",
    "    x2 = x + 1\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "if `x` was a delayed result (like `total`, above), then so is `x2`. Supported operations include arithmetic operators, item or slice selection, attribute access and method calls - essentially anything that could be phrased as a `lambda` expression.\n",
    "\n",
    "Operations which are *not* supported include mutation, setter methods, iteration (for) and bool (predicate)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Appendix: Further detail and examples"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The following examples show that the kinds of things Dask does are not so far removed from normal Python programming when dealing with big data. These examples are **only meant for experts**, typical users can continue with the next notebook in the tutorial."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Example 1: simple word count"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This directory contains a file called `README.md`. How would you count the number of words in that file?\n",
    "\n",
    "The simplest approach would be to load all the data into memory, split on whitespace and count the number of results. Here we use a regular expression to split words."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import re\n",
    "splitter = re.compile('\\w+')\n",
    "with open('README.md', 'r') as f:\n",
    "    data = f.read()\n",
    "result = len(splitter.findall(data))\n",
    "result"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The trouble with this approach is that it does not scale - if the file is very large, it, and the generated list of words, might fill up memory. We can easily avoid that, because we only need a simple sum, and each line is totally independent of the others. Now we evaluate each piece of data and immediately free up the space again, so we could perform this on arbitrarily-large files. Note that there is often a trade-off between time-efficiency and memory footprint: the following uses very little memory, but may be slower for files that do not fill a large faction of memory. In general, one would like chunks small enough not to stress memory, but big enough for efficient use of the CPU."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "result = 0\n",
    "with open('README.md', 'r') as f:\n",
    "    for line in f:\n",
    "        result += len(splitter.findall(line))\n",
    "result"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Example 2: background execution"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "There are many tasks that take a while to complete, but don't actually require much of the CPU, for example anything that requires communication over a network, or input from a user. In typical sequential programming, execution would need to halt while the process completes, and then continue execution. That would be dreadful for user experience (imagine the slow progress bar that locks up the application and cannot be canceled), and wasteful of time (the CPU could have been doing useful work in the meantime).\n",
    "\n",
    "For example, we can launch processes and get their output as follows:\n",
    "```python\n",
    "    import subprocess\n",
    "    p = subprocess.Popen(command, stdout=subprocess.PIPE)\n",
    "    p.returncode\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The task is run in a separate process, and the return-code will remain `None` until it completes, when it will change to `0`. To get the result back, we need `out = p.communicate()[0]` (which would block if the process was not complete)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Similarly, we can launch Python processes and threads in the background. Some methods allow mapping over multiple inputs and gathering the results, more on that later.  The thread starts and the cell completes immediately, but the data associated with the download only appears in the queue object some time later."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Edit sources.py to configure source locations\n",
    "import sources\n",
    "sources.lazy_url"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import threading\n",
    "import queue\n",
    "import urllib\n",
    "\n",
    "def get_webdata(url, q):\n",
    "    u = urllib.request.urlopen(url)\n",
    "    # raise ValueError\n",
    "    q.put(u.read())\n",
    "\n",
    "q = queue.Queue()\n",
    "t = threading.Thread(target=get_webdata, args=(sources.lazy_url, q))\n",
    "t.start()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# fetch result back into this thread. If the worker thread is not done, this would wait.\n",
    "q.get()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Consider: what would you see if there had been an exception within the `get_webdata` function? You could uncomment the `raise` line, above, and re-execute the two cells. What happens? Is there any way to debug the execution to find the root cause of the error?"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Example 3: delayed execution"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "There are many ways in Python to specify the computation you want to execute, but only run it *later*."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def add(x, y):\n",
    "    return x + y\n",
    "\n",
    "# Sometimes we defer computations with strings\n",
    "x = 15\n",
    "y = 30\n",
    "z = \"add(x, y)\"\n",
    "eval(z)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# we can use lambda or other \"closure\"\n",
    "x = 15\n",
    "y = 30\n",
    "z = lambda: add(x, y)\n",
    "z()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# A very similar thing happens in functools.partial\n",
    "\n",
    "import functools\n",
    "z = functools.partial(add, x, y)\n",
    "z()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Python generators are delayed execution by default\n",
    "# Many Python functions expect such iterable objects\n",
    "\n",
    "def gen():\n",
    "    res = x\n",
    "    yield res\n",
    "    res += y\n",
    "    yield res\n",
    "\n",
    "g = gen()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# run once: we get one value and execution halts within the generator\n",
    "# run again and the execution completes\n",
    "next(g)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Dask graphs"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Any Dask object, such as `total`, above, has an attribute which describes the calculations necessary to produce that result. Indeed, this is exactly the graph that we have been talking about, which can be visualized. We see that it is a simple dictionary, in which the keys are unique task identifiers, and the values are the functions and inputs for calculation.\n",
    "\n",
    "`delayed` is a handy mechanism for creating the Dask graph, but the adventurous may wish to play with the full fexibility afforded by building the graph dictionaries directly. Detailed information can be found [here](http://dask.pydata.org/en/latest/graphs.html)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "total.dask"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dict(total.dask)"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
